#!/usr/bin/env python

'Push new and changed files to a DDFS.'

# Copyright (c) Los Alamos National Security, LLC, and others.

help_epilogue = '''
The default behavior is to push a single replica of each file. On the next
garbage collection, DDFS will rebalance and bring each file up to the proper
number of replicas; note that this can be an expensive operation which
degrades the performance of jobs if you pushed a lot of data. If you prefer to
incur this cost up front, use --safer.

Be aware that this script depends on undocumented parts of DDFS.
'''

# Note: This script is a lot longer than it might otherwise be because DDFS
# does not provide a "remove blog from tag", only "replace the list of blobs
# for a tag".

import argparse
from datetime import datetime
import itertools
import os.path
import re
import time

import disco.ddfs
import disco.error

import testable
import time_
import u


# Regex which matches the original filename part of a DDFS URL.
# e.g., u'disco://localhost/ddfs/vol0/blob/13/2012-10-31_tsv$554-51b92-c8a7c'
URL_FILENAME_RE = re.compile(r'^.*/(.+)\$')


ap = argparse.ArgumentParser(description=__doc__, epilog=help_epilogue,
                             formatter_class=argparse.RawTextHelpFormatter)
ap.add_argument('--safer',
                action='store_true',
                help='push standard number of replicas instead of 1')
ap.add_argument('--unittest',
                nargs=0,
                action=testable.Raise_Unittest_Exception,
                help='run unit tests instead of doing anything real')
ap.add_argument('--verbose',
                action='store_true',
                help='be more verbose, wait until job finishes before exiting')
ap.add_argument('tag',
                metavar='TAG',
                help='destination tag')
ap.add_argument('files',
                metavar='FILE',
                nargs='+',
                help='file to push')


# Note that URL lists are lists of lists: each sub-list is a replication set.

def main():
   l.info('starting')
   t_start = time.time()
   fs = disco.ddfs.DDFS()
   # stat the tag
   try:
      tag = fs.get(args.tag)
      tag_timestamp = time_.ddfs_parse(tag['last-modified'])
      tag_old_urls = tag['urls']
      l.info('tag %s last modified %s' % (args.tag, tag_timestamp.isoformat()))
   except disco.error.CommError, x:
      if (x.code != 404): raise
      l.info('tag %s does not exist, will create' % (args.tag))
      tag_timestamp = time_.datetime_min
      tag_old_urls = list()
   # verify that it's a leaf tag
   for url in itertools.chain.from_iterable(tag_old_urls):
      if (url[:8] != 'disco://'):
         u.abort('%s is not a leaf tag: contains URL %s' % (args.tag, url))
   # push files that need pushing
   new_ct = 0
   replaced_ct = 0
   unchanged_ct = 0
   bytes_uploaded = 0
   tag_new_urls = list()
   for filename in args.files:
      file_timestamp = datetime.fromtimestamp(os.path.getmtime(filename))
      l.debug('%s last modified: %s' % (filename, file_timestamp.isoformat()))
      replica_sets = [reps for reps in tag_old_urls
                      if file_url_match(filename, reps[0])]
      l.debug('%s existing replica sets: %d' % (filename, len(replica_sets)))
      assert (0 <= len(replica_sets) <= 1)
      if (file_timestamp < tag_timestamp and len(replica_sets) == 1):
         l.debug('%s already present and unchanged, skipping' % (filename))
         unchanged_ct += 1
         replicas = replica_sets[0]
      else:
         if (len(replica_sets) == 0):
            l.info('%s does not exist in tag, pushing' % (filename))
            if (file_timestamp < tag_timestamp):
               l.warning('%s is older but does not exist in tag' % (filename))
            new_ct += 1
         else:
            l.info('%s is newer than tag, pushing replacement' % (filename))
            replaced_ct += 1
         bytes_uploaded += os.path.getsize(filename)
         replicas = fs.push(args.tag, [filename],
                            replicas=(None if args.safer else 1))[1][0]
      assert (all(file_url_match(filename, rep) for rep in replicas))
      tag_new_urls.append(replicas)
   # set urls for tag
   l.debug('resetting URLs for %s' % (args.tag))
   fs.put(args.tag, tag_new_urls)
   # done
   l.info('%d new files, %d replaced, %d unchanged'
          % (new_ct, replaced_ct, unchanged_ct))
   duration = time.time() - t_start
   l.info('%s uploaded in %s (%s / s)'
          % (u.fmt_bytes(bytes_uploaded), u.fmt_seconds(duration),
             u.fmt_bytes(bytes_uploaded / duration)))

def file_url_match(filename, url):
   '''Return True if the given filename could have been canonized to the given
      DDFS URL. Note that this is not a 1-to-1 transformation. E.g.:

      >>> file_url_match('/foo/2012-10-31.all.tsv.1000', u'disco://localhost/ddfs/vol0/blob/13/2012-10-31_all_tsv_1000$554-51b92-c8a7c')
      True
      >>> file_url_match('/foo/2012-11-01.all.tsv.1000', u'disco://localhost/ddfs/vol0/blob/13/2012-10-31_all_tsv_1000$554-51b92-c8a7c')
      False'''
   xform_name = disco.ddfs.DDFS.safe_name(os.path.basename(filename))
   url_filename = URL_FILENAME_RE.search(url).group(1)
   return (xform_name == url_filename)


try:
   args = u.parse_args(ap)
   l = u.logging_init('mpush')

   if (__name__ == '__main__'):
      main()
except testable.Unittests_Only_Exception:
   testable.register('')
